Product
Socket Now Supports uv.lock Files
Socket now supports uv.lock files to ensure consistent, secure dependency resolution for Python projects and enhance supply chain security.
The 'rhea' npm package is a high-level AMQP 1.0 client library for Node.js. It provides a comprehensive set of features for building messaging applications, including sending and receiving messages, managing connections, and handling various messaging patterns.
Sending Messages
This feature allows you to send messages to an AMQP broker. The code sample demonstrates how to connect to a broker, open a sender link, and send a message.
const rhea = require('rhea');
const connection = rhea.connect({ host: 'localhost', port: 5672 });
connection.open_sender('examples');
connection.on('sendable', function (context) {
context.sender.send({ body: 'Hello, World!' });
connection.close();
});
Receiving Messages
This feature allows you to receive messages from an AMQP broker. The code sample demonstrates how to connect to a broker, open a receiver link, and handle incoming messages.
const rhea = require('rhea');
const connection = rhea.connect({ host: 'localhost', port: 5672 });
connection.open_receiver('examples');
connection.on('message', function (context) {
console.log('Received message:', context.message.body);
connection.close();
});
Managing Connections
This feature allows you to manage connections to an AMQP broker. The code sample demonstrates how to handle connection open and close events.
const rhea = require('rhea');
const connection = rhea.connect({ host: 'localhost', port: 5672 });
connection.on('connection_open', function (context) {
console.log('Connection opened');
});
connection.on('connection_close', function (context) {
console.log('Connection closed');
});
The 'amqp10' package is another AMQP 1.0 client for Node.js. It provides similar functionalities to 'rhea' for sending and receiving messages, but 'rhea' is often preferred for its more comprehensive feature set and better performance.
The 'amqplib' package is an AMQP 0.9.1 client for Node.js. While it supports a different version of the AMQP protocol, it offers similar functionalities for messaging applications. 'rhea' is more suitable for users who need AMQP 1.0 support.
A reactive library for the AMQP protocol, for easy development of both clients and servers.
Brief example of sending and receiving a message through a broker/server listening on port 5672:
var container = require('rhea');
container.on('message', function (context) {
console.log(context.message.body);
context.connection.close();
});
container.once('sendable', function (context) {
context.sender.send({body:'Hello World!'});
});
var connection = container.connect({'port':5672});
connection.open_receiver('examples');
connection.open_sender('examples');
output:
Hello World!
There are some examples of using the library under the examples folder. These include:
helloworld.js - essentially the code above, which sends and receives a single message through a broker
direct_helloworld.js - an example showing the sending of a single message without the use of a broker, by listening on a port and then openning a connection to itself over which the message is transfered.
send_raw.js - explicitly set the data section of the message body
simple_send.js - connects to a specified port then sends a number of messages to a given address
simple_recv.js - connects to a specified port then subscribes to receive a number of messages from a given address
These last two can be used together to demsontrate sending messages from one process to another, using a broker or similar intermediary to which they both connect.
The direct_recv.js example can be used in conjunction with the simple_send.js example to demonstrate sending messages between processes without the use of any intermediary. Note however the the default port of one or ther other will need to be changed through the -p command line option.
In durable_subscription, a subscriber and a publisherwhich demonstrate the notion of a durable subscription when used in conjunction with a broker such as ActiveMQ
In selector a receiver that uses a selector - a SQL like query string that restricts the set of messages delivered - and an accompanying sender
In sasl a sasl client showing how to authenticate to the service you connect to. This can be used against any broker as well as either of two example servers showing anonymous and plain mechanisms.
A tls client and server demonstrating connecting (and possibly authenticating) over a tls secured socket.
A client to demonstrate the built in automatic reconnection functionality along with a simple echo server against which it can be run. It can of course also be run against a broker instead (or as well!).
Both node based and web based websocket clients along with a server which will echo back any requests received. The clients can also be used against a websocket enabled AMQP broker with a queue or topic called 'examples'. The node based scritps require the 'ws' node module to be installed. The browser based example requires a browserified version of the rhea library (this can be created e.g. by calling npm run-script browserify or make browserify). The browserified and minimized javascript library is stored under the dist/ directory.
To run the examples you will need the dependencies installed: the library itself depends on the 'debug' module, and some of the examples depend on the 'yargs' module for command line option parsing.
The 'rhea' module itself must also be findable by node. You can do this either by checking out the code from git and setting NODE_PATH to include the directory to which you do so (i.e. the directory in which 'a directory named 'rhea' can be found, or you can install the module using npm.
Some of the examples assume an AMQP compatible broker, such as those offered by the ActiveMQ or Qpid Apache projects, is running.
There are five core types of object in the API:
Each of these inherits all the methods of EventEmitter, allowing handlers for particular events to be attached. Events that are not handled at sender or receiver scope are then propagated up to possibly be handled at session scope. Events that are not handled at session scope are then propagated up to possibly be handled at connection scope, and if not there then in container scope.
Two other relevant objects are:
An AMQP container from which outgoing connections can be made and/or to which incoming connections can be accepted. The module exports a default instance of a Container which can be used directly. Other instances can be created from that if needed using the create_container method. A container is identified by the id property. By default a uuid is used, but the property can be set to something more specific if desired before making or accepting any connections.
Connects to the server specified by the host and port supplied in the options and returns a Connection.
The options argument is an object that may contain any of the following fields:
If the transport is TLS, the options may additionally specify a 'servername' property. This allows the SNI to be controlled separately from the host option. If servername is not specified, the SNI will default to the host. If using TLS options for 'ca', 'cert' and 'key' may also be specified (see https://nodejs.org/api/tls.html#tls_tls_createsecurecontext_options)
If options is undefined, the client will attempt to obtain default options from a JSON config file. This file is of similar structure to that used by Apache Qpid Proton clients. The location of the file can be specified through the MESSAGING_CONNECT_FILE environment variable. If that is not specified it will look for a file called connect.json in the current directory, in /.config/messaging or /etc/messaging/.
The config file offers only limited configurability, specifically:
Starts a server socket listening for incoming connections on the port (and optionally interface) specified in the options.
The options argument is an object that may contain any of the following fields:
Returns a new container instance. The method takes an options object which can contain the following field:
If no id is specified a new uuid will be generated.
Simple utility for generating a stringified uuid, useful if you wish to specify distinct container ids for different connections.
Returns a function that can be used to create another function suitable for use as the value of 'connection_details' in a connect call in order to connect over websockets. The function returned here takes a websocket url and optional arguments. The websocket_connect method itself take the constructor of the WebSocket implementation to use. It has been tested with the implementation in firefox and also that in the node module 'ws'.
Used to start handling an incoming websocket connection as an AMQP connection. See the websocket echo server example for how to use it.
Establishes a link over which messages can be received and returns a Receiver representing that link. A receiving link is a subscription, i.e. it expresses a desire to receive messages.
The argument to this method can either be a simple string indicating the source of messages of interest (e.g. a queue name), or an options object that may contain any of the following fields:
Note: If the link doesn't specify a value for the credit_window and autoaccept options, the connection options are consulted followed by the container options. The default is used only if an option is not specified at any level.
Establishes a link over which messages can be sent and returns a Sender representing that link. A sending link is an analogous concept to a subscription for outgoing rather than incoming messages. I.e. it expresses a desire to send messages.
The argument to this method can either be a simple string indicating the target for messages of interest (e.g. a queue name), or an options object that may contain any of the following fields:
Note: If the link doesn't specify a value for the autosettle option, the connection options are consulted followed by the container options. The default is used only if an option is not specified at any level.
Sends the specified message over the default sender, which is a sending link whose target address is null. The use of this method depends on the peer supporting so-called 'anonymous relay' semantics, which most AMQP 1.0 brokers do. The message should have the 'to' field set to the intended destination.
Closes a connection (may take an error object which is an object that consists of condition and description fields).
Provide information about the connection status. If it's opened or closed.
Creates a new session if you want to manage sessions by yourself.
Raised when the remote peer indicates the connection is open. This occurs also on reconnect.
Raised when the remote peer indicates the connection is closed. This can happen either as a response to our close, or by itself. The connection and sessions will not be reconnected.
Raised when the remote peer indicates the connection is closed and
specifies an error. A connection_close
event will always follow this
event, so it only needs to be implemented if there are specific
actions to be taken on a close with an error as opposed to a close.
The error is available as a property on the event context.
If neither the connection_error or the connection_close is handled by
the application, an error
event will be raised. This can be handled
on the connection or the container. If this is also unhandled, the
application process will exit.
Raised when a protocol error is received on the underlying socket.
A disconnected
event will follow with any reconnect as configured.
Raised when an error is received on the underlying socket. This catches any errors otherwise not handled.
Raised when the underlying tcp connection is lost or nonfatal error
was received. The context has a reconnecting
property which is true
if the library is attempting to automatically reconnect and false if
it has reached the reconnect limit. If reconnect has not been enabled
or if the connection is a tcp server, then the reconnecting
property
is undefined. The context may also have an error
property giving
some information about the reason for the disconnect. If the
disconnect event is not handled, a warning will be logged to the
console.
You should update the application state to resend any unsettled messages again once the connection is recovered.
Raised when remote settled the message.
Session is an aggregation of Receiver and Sender links and provides the context and sequencing of messages for all the links it contains. A Connection creates a default session for you if you create receivers and senders on the Connection. You only need to use this object if you want to group your links into more than one session.
This adds a receiver on the session. The open_receiver
on the Connection object finds the session and calls
this.
This adds a sender on the session. The open_sender
on the Connection object finds the session and calls
this.
End a session (may take an error object which is an object that consists of condition and description fields).
Provide information about the session status. If it's opened or closed.
Raised when the remote peer indicates the session is open (i.e. begun in AMQP parlance).
Raised when the remote peer indicates the session is closed (i.e. ended in AMQP parlance). The session will be removed from the connection after the event.
Raised when the remote peer indicates the session has ended and
specifies an error. A session_close
event will always follow this
event, so it only needs to be implemented if there are specific
actions to be taken on a close with an error as opposed to a close.
The error is available as error
property on the session object.
If neither the session_error or the session_close is handled by the
application, an error
event will be raised on the container. If this
is also unhandled, the application process will exit.
Raised when remote settled the message.
Closes a receiving link (i.e. cancels the subscription). (May take an error object which is an object that consists of condition and description fields).
Detaches a link without closing it. For durable subscriptions this means the subscription is inactive, but not cancelled.
By default, receivers have a prefetch window that is moved automatically by the library. However if desired the application can set the prefecth to zero and manage credit itself. Each invocation of add_credit() method issues credit for a further 'n' messages to be sent by the peer over this receiving link. [Note: flow()is an alias for add_credit()]
Returns the amount of outstanding credit that has been issued.
Raised when a message is received. The context passed will have a message, containing the received content, and a delivery which can be used to acknowledge receipt of the message if autoaccept has been disabled.
Raised when the remote peer indicates the link is open (i.e. attached in AMQP parlance).
Raised when the remote peer indicates that it has drained all credit (and therefore there are no more messages at present that it can send).
Raised when a flow is received for receiver.
Raised when the remote peer closes the receiver with an error. A
receiver_close
event will always follow this event, so it only needs
to be implemented if there are specific actions to be taken on a close
with an error as opposed to a close. The error is available as an
error
property on the receiver.
Raised when the remote peer indicates the link is closed (i.e. detached in AMQP parlance).
Raised when remote settled the message.
Sends a message. The link need not be yet open nor is any credit needed, but there is a limit of 2048 deliveries in the Session queue before it raises an exception for buffer overflow.
Unsettled messages, whether transmitted or not, are lost on reconnect
and there will be no accepted
, released
, rejected
events. You
may need to resend the messages on a disconnected
event.
If the messages to be sent can be generated or fetched on demand or
there is large number of messages, it is recommended send
is called
only while the sender is sendable()
. When sender is no longer
sendable, continue sending in the sendable
event.
Closes a sending link (may take an error object which is an object that consists of condition and description fields).
Detaches a link without closing it.
Returns true if the sender has available credits for sending a message. Otherwise it returns false.
This must be called in response to sender_draining
event to tell
peer we have drained our messages or credit.
Raised when the sender has received credit to be able to transmit messages to its peer. You will not receive a new event until the peer sends more credit, even if you have some credit left.
Raised when a sent message is accepted by the peer.
Raised when a sent message is released by the peer.
Raised when a sent message is rejected by the peer.
context.delivery.remote_state.error
may carry diagnostics to explain
rejection, for example a condition
property with value
amqp:unauthorized-access
.
Raised when a sent message is modified by the peer. The
context.delivery.remote_state
may have delivery_failed
and
undeliverable_here
boolean and message_annotations
map properties
to guide any message retransmission as specified in the AMQP 1.0
specification.
Raised when the remote peer indicates the link is open (i.e. attached in AMQP parlance).
Raised when the remote peer requests that the sender drain its credit;
sending all available messages within the credit limit and calling
set_drained(true)
. After this the sender has no credit left.
Raised when a flow is received for sender. sender_draining
and
sendable
events may follow this event, so it only needs to be
implemented if there are specific actions to be taken.
Raised when the remote peer closes the sender with an error. A
sender_close
event will always follow this event, so it only needs
to be implemented if there are specific actions to be taken on a close
with an error as opposed to a close. The error is available as an
error
property on the sender.
Raised when the remote peer indicates the link is closed (i.e. detached in AMQP parlance).
Raised when remote settled the message.
A message is an object that may contain the following fields:
data_section
,
data_sections
, sequence_section
or sequence_sections
from
rhea.message
.Messages are passed to the send() method of Connection or Sender, and
are made available as message
on the event context for the message
event on a Receiver or its parent(s).
The delivery object provides information on- and enables control over- the state of a message transfer.
The methods on a delivery object are:
If autoaccept is disabled on a receiver, the application should ensure that it accepts (or releases or rejects) all messages received.
Note: For detailed options and types, please refer to the type definitions in the typings directory.
FAQs
reactive AMQP 1.0 library
The npm package rhea receives a total of 267,737 weekly downloads. As such, rhea popularity was classified as popular.
We found that rhea demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket now supports uv.lock files to ensure consistent, secure dependency resolution for Python projects and enhance supply chain security.
Research
Security News
Socket researchers have discovered multiple malicious npm packages targeting Solana private keys, abusing Gmail to exfiltrate the data and drain Solana wallets.
Security News
PEP 770 proposes adding SBOM support to Python packages to improve transparency and catch hidden non-Python dependencies that security tools often miss.